package hotnet.service;

import hotnet.config.DefaultIoSocketSessionConfig;
import hotnet.config.IoSessionConfig;
import hotnet.config.SocketSessionConfig;
import hotnet.exception.ExceptionMonitor;
import hotnet.future.AcceptorFuture;
import hotnet.processor.IoProcessor;
import hotnet.processor.SimpleIoProcessorPool;
import hotnet.session.IoSession;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractPollingIoAcceptor extends AbstractIoService implements SocketAcceptor {
	private AtomicReference<AcceptorRunnable> acceptorRef = new AtomicReference<AcceptorRunnable>();
	private Object acceptorLock = new Object();
	/* bind or unbind lock */
	private Object bindLock = new Object();
	private Semaphore lock = new Semaphore(1);
	private IoProcessor processor;
	private volatile boolean selectable;
	private final Queue<AcceptorFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorFuture>();
	private final Queue<AcceptorFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorFuture>();
	private Map<SocketAddress, ServerSocketChannel> boundChannels = Collections.synchronizedMap(new HashMap<SocketAddress, ServerSocketChannel>());
	private List<SocketAddress> defaultLocalAddresses = new ArrayList<SocketAddress>();
	private boolean reuseAddress = true;
	private int backlog = 1024;
	
	private static Logger logger = LoggerFactory.getLogger(AbstractPollingIoAcceptor.class);
	
	public AbstractPollingIoAcceptor(IoSessionConfig config, Class<? extends IoProcessor> processorCls) {
		super(new DefaultIoSocketSessionConfig());
		
		this.processor = new SimpleIoProcessorPool(processorCls);
		
		try {
			initAcceptor();
		} catch (Exception e) {
			throw new IllegalStateException("start polling error:" + e.getMessage());
		}
	}
	
	protected abstract void init() throws Exception;
	protected abstract IoSession accept(IoProcessor processor, ServerSocketChannel handle) throws Exception;
	protected abstract void close(ServerSocketChannel handle) throws Exception;
	protected abstract int select() throws Exception;
	protected abstract void wakeupAcceptor();
	protected abstract ServerSocketChannel open(SocketAddress localAddress) throws Exception;
	protected abstract Iterator<ServerSocketChannel> selectedHandles();
	
	public void initAcceptor() throws Exception {
		init();
		
		selectable = true;
	}

    public void dispose0() throws Exception {
        unbind();
        
        startAcceptor();
        
        wakeupAcceptor();
    }
    
    public void bind(int port) throws IOException {
		if (isDisposing()) {
			throw new IllegalStateException("Already disposed.");
		}
		
		SocketAddress address = new InetSocketAddress(port);
		
		List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(1);
		localAddresses.add(address);
			
		bind(localAddresses);
			
	}
	
	public void bind(SocketAddress localAddress) throws IOException {
		if (isDisposing()) {
			throw new IllegalStateException("Already disposed.");
		}

		List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(1);
		localAddresses.add(localAddress);
			
		bind(localAddresses);
			
	}
	
	public void bind() throws IOException {
		if (!defaultLocalAddresses.isEmpty()) {
			bind(defaultLocalAddresses);
		} else {
			throw new IllegalStateException("set default address");
		}
	}
	
	public void bind(List<SocketAddress> list) throws IOException {
		
		boolean activate = false;
		synchronized (bindLock) {
			if (isDisposing()) {
				throw new IllegalStateException("Already disposed.");
			}
			
			try {
				if (boundChannels.isEmpty() == true) {
					activate = true;
				}
				
				if (getHandler() == null) {
					throw new IllegalStateException("handler is not set");
				}
				
				bind0(list);
			} catch(IOException e) {
				throw e;
			} catch (Exception e) {
				 throw new RuntimeException("Failed to bind to: " + getLocalAddresses(), e);
			}
		}
		
		if (activate) {
			getListenerManager().fireServiceActivated();
		}
	}
	
	private void bind0(List<SocketAddress> localAddresses) throws Exception {
		AcceptorFuture acceptorFuture = new AcceptorFuture(localAddresses);
		
		registerQueue.add(acceptorFuture);
		
		/* start acceptor runnable */
		startAcceptor();
		
		try {
			/* wait runnable release lock */
			lock.acquire();
			
			Thread.sleep(100);
			wakeupAcceptor();
			
		} finally {
            lock.release();
        }
		
		acceptorFuture.awaitUninterruptibly();
		
		if (acceptorFuture.getException() != null) {
			throw new Exception(acceptorFuture.getException());
		}
	}
	
	public void unbind() {
		List<SocketAddress> list = new ArrayList<SocketAddress>();
		list.addAll(boundChannels.keySet());
		
		unbind(list);
	}
	
	public void unbind(SocketAddress localAddress) {
		if (isDisposing()) {
			throw new IllegalStateException("closed acceptor");
		}
		
		List<SocketAddress> list = new ArrayList<SocketAddress>(1);
		list.add(localAddress);
			
		unbind(list);
	}
	
	public void unbind(List<SocketAddress> localAddressList) {
		if (isDisposing()) {
			throw new IllegalStateException("closed acceptor");
		}
		
		if (isActive() == false) {
			throw new IllegalStateException("unactive acceptor");
		}
		
		try {
			unbind0(localAddressList);
			
		} catch (RuntimeException e) {
			throw e;
		} catch (Exception e) {
			throw new RuntimeException("unbind exception");
		}

	}
	
	private void unbind0(List<SocketAddress> localAddresses) throws Exception {
		AcceptorFuture cancelAcceptorFuture = new AcceptorFuture(localAddresses);
		
		cancelQueue.add(cancelAcceptorFuture);
		
		try {
			/* wait runnable release lock */
			lock.acquire();
			
			Thread.sleep(100);
			wakeupAcceptor();
			
		} finally {
            lock.release();
        }
		
		cancelAcceptorFuture.awaitUninterruptibly();
		
		if (cancelAcceptorFuture.getException() != null) {
			throw new Exception(cancelAcceptorFuture.getException());
		}
		
		if (boundChannels.isEmpty() == true) {
			getListenerManager().fireServiceDeactivated();
		}
	}
	
	public void startAcceptor() throws InterruptedException {
		AcceptorRunnable runnable = acceptorRef.get();
		
		if (runnable == null) {
			synchronized (acceptorLock) {
				lock.acquire();	// in AcceptorRunnable release the lock 
				runnable = new AcceptorRunnable();
				
				if (acceptorRef.compareAndSet(null, runnable)) {
					startRunService(runnable);
				} else {
					lock.release();
				}
			}
		}
	}
	
	private int register() {
		for (;;) {
			AcceptorFuture future = registerQueue.poll();
			
			if (future == null) {
				return 0;
			}
			
			List<SocketAddress> toBindAddresses = future.getLocalAddresses();
			Map<SocketAddress, ServerSocketChannel> newChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
			
			try {
				for (SocketAddress sa : toBindAddresses) {
					ServerSocketChannel channel = open(sa);
					newChannels.put(sa, channel);
				}
				
				boundChannels.putAll(newChannels);
				future.setValue(Boolean.TRUE);
				
				return newChannels.size();
			} catch (Exception e) {
				 future.setException(e);
			} finally {
				if (future.getException() != null) {
					for (ServerSocketChannel channel : newChannels.values()) {
						try {
							close(channel);
						} catch (Exception e) {
							ExceptionMonitor.getInstance().exceptionCaught(e);
						}
					}
				}
			}
		}
	}
	
	private int unregister() {
		int unbound = 0;
		
		for (;;) {
			AcceptorFuture future = cancelQueue.poll();
			
			if (future == null) {
				break;
			}
			
			for (SocketAddress a : future.getLocalAddresses()) {
				ServerSocketChannel channel = boundChannels.remove(a);
				
				try {
					close(channel);
				} catch (Exception e) {
					ExceptionMonitor.getInstance().exceptionCaught(e);
				} finally {
					unbound++;
				}
			}
		}
		
		return unbound;
	}
	
	public final List<SocketAddress> getDefaultLocalAddresses() {
		return this.defaultLocalAddresses;
	}
	
	public Set<SocketAddress> getLocalAddresses() {
		return new HashSet<SocketAddress>(this.boundChannels.keySet());
	}

	
	public final void setDefaultLocalAddresses(List<SocketAddress> localAddresses) {
		if (this.defaultLocalAddresses.isEmpty()) {
			synchronized (localAddresses) {
				if (!this.defaultLocalAddresses.isEmpty()) {
					this.defaultLocalAddresses.addAll(localAddresses);
				} else {
					throw new IllegalArgumentException("default local adress had set before");
				}
			}
		} else {
			throw new IllegalArgumentException("default local adress had set before");
		}
	}
	
	@Override
	public boolean isReuseAddress() {
		return reuseAddress;
	}

	@Override
	public void setReuseAddress(boolean reuseAddress) {
		if (isActive()) {
			throw new IllegalArgumentException("reuse flag cannot be set in active acceptor");
		}
		
		this.reuseAddress = reuseAddress;
	}

	@Override
	public int getBacklog() {
		return this.backlog;
	}

	@Override
	public void setBacklog(int backlog) {
		if (isActive()) {
			throw new IllegalArgumentException("backlog cannot be set in active acceptor");
		}
		
		this.backlog = backlog;
	}
	
	@Override
	public SocketSessionConfig getSocketSessionConfig() {
		IoSessionConfig config = getDefaultSessionConfig();
		if (config instanceof SocketSessionConfig) {
			return (SocketSessionConfig)config;
		} else {
			throw new IllegalArgumentException("not socketconfig");
		}
	}

	
	private class AcceptorRunnable implements Runnable {
		@Override
		public void run() {
			/* start run acceptor */
			lock.release();
			
			int nBounds = 0;
			
			while (selectable) {
				try {
					int selected = select();
					nBounds += register();
					
					/* no bind listen socket to accept */
					if (nBounds == 0) {
						if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
							break;
						}
					}
					
					if (selected > 0) {
						processBounds(selectedHandles());
					}
					
					/* delete unbind request */
					nBounds -= unregister();
				} catch (ClosedSelectorException e) {
					ExceptionMonitor.getInstance().exceptionCaught(e);
					break;
				} catch (Exception e) {
					/* sleep 1s */
					try {
						Thread.sleep(1000);
					} catch (Exception e2) {
						ExceptionMonitor.getInstance().exceptionCaught(e2);
					}
				}
				
			}
			
			if (selectable && isDisposing()) {
				selectable = false;
				
				/* close processor */
				processor.dispose();
			}
		}
		
		private void processBounds(Iterator<ServerSocketChannel> handles) throws Exception {
            while (handles.hasNext()) {
            	ServerSocketChannel handle = handles.next();
                handles.remove();

                // Associates a new created connection to a processor,
                // and get back a session
                IoSession session = accept(processor, handle);

                if (session == null) {
                    continue;
                }

                initSession(session, null);

                // add the session to the SocketIoProcessor
                logger.info("new session:" + session.getSessionId() + " processor:" + session.getProcessor());
                session.getProcessor().add(session);
            }
        }
	}

}
